1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.http.httptansaction; 12 import kiss.logger; 13 import collie.codec.http.codec.httpcodec; 14 import collie.codec.http.httpmessage; 15 import collie.codec.http.errocode; 16 import collie.codec.http.httpwritebuffer; 17 import kiss.net; 18 import kiss.event; 19 20 import std.socket; 21 public 22 import collie.codec.http.codec.wsframe; 23 24 enum TransportDirection : ubyte { 25 DOWNSTREAM, // toward the client 26 UPSTREAM // toward the origin application or data 27 } 28 29 interface HTTPTransactionHandler 30 { 31 /** 32 * Called once per transaction. This notifies the handler of which 33 * transaction it should talk to and will receive callbacks from. 34 */ 35 void setTransaction(HTTPTransaction txn); 36 /** 37 * Called once after a transaction successfully completes. It 38 * will be called even if a read or write error happened earlier. 39 * This is a terminal callback, which means that the HTTPTransaction 40 * object that gives this call will be invalid after this function 41 * completes. 42 */ 43 void detachTransaction(); 44 45 /** 46 * Called at most once per transaction. This is usually the first 47 * ingress callback. It is possible to get a read error before this 48 * however. If you had previously called pauseIngress(), this callback 49 * will be delayed until you call resumeIngress(). 50 */ 51 void onHeadersComplete(HTTPMessage msg); 52 53 /** 54 * Can be called multiple times per transaction. If you had previously 55 * called pauseIngress(), this callback will be delayed until you call 56 * resumeIngress(). 57 */ 58 void onBody(const ubyte[] chain); 59 60 /** 61 * Can be called multiple times per transaction. If you had previously 62 * called pauseIngress(), this callback will be delayed until you call 63 * resumeIngress(). This signifies the beginning of a chunk of length 64 * 'length'. You will receive onBody() after this. Also, the length will 65 * be greater than zero. 66 */ 67 void onChunkHeader(size_t length) ; 68 69 /** 70 * Can be called multiple times per transaction. If you had previously 71 * called pauseIngress(), this callback will be delayed until you call 72 * resumeIngress(). This signifies the end of a chunk. 73 */ 74 void onChunkComplete() ; 75 76 /** 77 * Can be called any number of times per transaction. If you had 78 * previously called pauseIngress(), this callback will be delayed until 79 * you call resumeIngress(). Trailers can be received once right before 80 * the EOM of a chunked HTTP/1.1 reponse or multiple times per 81 * transaction from SPDY and HTTP/2.0 HEADERS frames. 82 */ 83 // void onTrailers(std::unique_ptr<HTTPHeaders> trailers) noexcept 84 // = 0; 85 86 /** 87 * Can be called once per transaction. If you had previously called 88 * pauseIngress(), this callback will be delayed until you call 89 * resumeIngress(). After this callback is received, there will be no 90 * more normal ingress callbacks received (onEgress*() and onError() 91 * may still be invoked). The Handler should consider 92 * ingress complete after receiving this message. This Transaction is 93 * still valid, and work may still occur on it until detachTransaction 94 * is called. 95 */ 96 void onEOM(); 97 98 /** 99 * Can be called at any time before detachTransaction(). This callback 100 * implies that an error has occurred. To determine if ingress or egress 101 * is affected, check the direciont on the HTTPException. If the 102 * direction is INGRESS, it MAY still be possible to send egress. 103 */ 104 void onError(HTTPErrorCode erromsg); 105 106 /** 107 * If the remote side's receive buffer fills up, this callback will be 108 * invoked so you can attempt to stop sending to the remote side. 109 */ 110 void onEgressPaused(); 111 112 /** 113 * This callback lets you know that the remote side has resumed reading 114 * and you can now continue to send data. 115 */ 116 void onEgressResumed(); 117 118 void onWsFrame(ref WSFrame wsf); 119 120 121 bool onUpgtade(CodecProtocol protocol,HTTPMessage msg); 122 } 123 124 class HTTPTransaction 125 { 126 interface Transport 127 { 128 void pauseIngress(HTTPTransaction txn); 129 130 void resumeIngress(HTTPTransaction txn); 131 132 void transactionTimeout(HTTPTransaction txn); 133 134 void sendHeaders(HTTPTransaction txn, 135 HTTPMessage headers, 136 bool eom); 137 138 size_t sendBody(HTTPTransaction txn, 139 in ubyte[], 140 bool eom); 141 142 size_t sendChunkHeader(HTTPTransaction txn, 143 size_t length); 144 145 size_t sendChunkTerminator(HTTPTransaction txn); 146 147 148 size_t sendEOM(HTTPTransaction txn); 149 150 void socketWrite(HTTPTransaction txn,StreamWriteBuffer buffer); 151 152 // size_t sendAbort(HTTPTransaction txn, 153 // HTTPErrorCode statusCode); 154 155 size_t sendWsData(HTTPTransaction txn,OpCode code,ubyte[] data); 156 // size_t sendPriority(HTTPTransaction txn, 157 // const http2::PriorityUpdate& pri); 158 // 159 // size_t sendWindowUpdate(HTTPTransaction txn, 160 // uint32_t bytes); 161 162 void notifyPendingEgress(); 163 164 void detach(HTTPTransaction txn); 165 166 // void notifyIngressBodyProcessed(uint32_t bytes); 167 // 168 // void notifyEgressBodyBuffered(int64_t bytes); 169 170 Address getLocalAddress(); 171 172 Address getPeerAddress(); 173 174 175 HTTPCodec getCodec(); 176 177 bool isDraining(); 178 179 } 180 181 this(TransportDirection direction, HTTPCodec.StreamID id,uint seqNo) 182 { 183 _id = id; 184 _seqNo = seqNo; 185 } 186 @property HTTPTransactionHandler handler(){return _handler;} 187 @property void handler(HTTPTransactionHandler han){_handler = han;} 188 189 @property streamID(){return _id;} 190 @property transport(Transport port){_transport = port;} 191 @property Transport transport(){return _transport;} 192 193 bool isUpstream() const { 194 return _direction == TransportDirection.UPSTREAM; 195 } 196 197 bool isDownstream() const { 198 return _direction == TransportDirection.DOWNSTREAM; 199 } 200 uint getSequenceNumber() const { return _seqNo; } 201 202 HTTPCodec.StreamID getID() const { return _id; } 203 204 205 Address getLocalAddress(){return _transport.getLocalAddress();} 206 207 Address getPeerAddress(){return _transport.getPeerAddress();} 208 209 /** 210 * Invoked by the session when the ingress headers are complete 211 */ 212 void onIngressHeadersComplete(HTTPMessage msg) 213 { 214 // logDebug("onIngressHeadersComplete handle is ", (handler is null)); 215 if(isUpstream() && msg.isResponse()) { 216 _lastResponseStatus = msg.statusCode; 217 } 218 if(_handler) 219 _handler.onHeadersComplete(msg); 220 } 221 222 /** 223 * Invoked by the session when some or all of the ingress entity-body has 224 * been parsed. 225 */ 226 void onIngressBody(const ubyte[] chain, ushort padding) 227 { 228 if(_handler) 229 _handler.onBody(chain); 230 } 231 232 /** 233 * Invoked by the session when a chunk header has been parsed. 234 */ 235 void onIngressChunkHeader(size_t length) 236 { 237 if(_handler) 238 _handler.onChunkHeader(length); 239 } 240 241 /** 242 * Invoked by the session when the CRLF terminating a chunk has been parsed. 243 */ 244 void onIngressChunkComplete() 245 { 246 if(_handler) 247 _handler.onChunkComplete(); 248 } 249 250 /** 251 * Invoked by the session when the ingress message is complete. 252 */ 253 void onIngressEOM() 254 { 255 if(_handler) 256 _handler.onEOM(); 257 } 258 259 void onErro(HTTPErrorCode erro) 260 { 261 if(_handler) 262 _handler.onError(erro); 263 } 264 /** 265 * Schedule or refresh the timeout for this transaction 266 */ 267 void refreshTimeout() {} 268 269 /** 270 * Timeout callback for this transaction. The timer is active while 271 * until the ingress message is complete or terminated by error. 272 */ 273 void timeoutExpired() {} 274 275 /** 276 * Send the egress message headers to the Transport. This method does 277 * not actually write the message out on the wire immediately. All 278 * writes happen at the end of the event loop at the earliest. 279 * Note: This method should be called once per message unless the first 280 * headers sent indicate a 1xx status. 281 * 282 * sendHeaders will not set EOM flag in header frame, whereas 283 * sendHeadersWithEOM will. sendHeadersWithOptionalEOM backs both of them. 284 * 285 * @param headers Message headers 286 */ 287 void sendHeaders(HTTPMessage headers) 288 { 289 sendHeadersWithOptionalEOM(headers,false); 290 } 291 292 void sendHeadersWithEOM(HTTPMessage headers) 293 { 294 sendHeadersWithOptionalEOM(headers,true); 295 } 296 297 void sendHeadersWithOptionalEOM(HTTPMessage headers, bool eom) 298 { 299 if(transport) 300 transport.sendHeaders(this,headers,eom); 301 } 302 /** 303 * Send part or all of the egress message body to the Transport. If flow 304 * control is enabled, the chunk boundaries may not be respected. 305 * This method does not actually write the message out on the wire 306 * immediately. All writes happen at the end of the event loop at the 307 * earliest. 308 * Note: This method may be called zero or more times per message. 309 * 310 * @param body Message body data; the Transport will take care of 311 * applying any necessary protocol framing, such as 312 * chunk headers. 313 */ 314 void sendBody(in ubyte[] body_, bool iseom = false){ 315 if(transport) 316 transport.sendBody(this,body_, iseom); 317 } 318 319 /** 320 * Write any protocol framing required for the subsequent call(s) 321 * to sendBody(). This method does not actually write the message out on 322 * the wire immediately. All writes happen at the end of the event loop 323 * at the earliest. 324 * @param length Length in bytes of the body data to follow. 325 */ 326 void sendChunkHeader(size_t length) { 327 if(transport) 328 transport.sendChunkHeader(this,length); 329 } 330 331 void socketWrite(StreamWriteBuffer buffer){ 332 if(transport) 333 transport.socketWrite(this,buffer); 334 } 335 336 /** 337 * Write any protocol syntax needed to terminate the data. This method 338 * does not actually write the message out on the wire immediately. All 339 * writes happen at the end of the event loop at the earliest. 340 * Frame begun by the last call to sendChunkHeader(). 341 */ 342 void sendChunkTerminator() { 343 if(transport) 344 transport.sendChunkTerminator(this); 345 } 346 /** 347 * Send part or all of the egress message body to the Transport. If flow 348 * control is enabled, the chunk boundaries may not be respected. 349 * This method does not actually write the message out on the wire 350 * immediately. All writes happen at the end of the event loop at the 351 * earliest. 352 * Note: This method may be called zero or more times per message. 353 * 354 * @param body Message body data; the Transport will take care of 355 * applying any necessary protocol framing, such as 356 * chunk headers. 357 */ 358 /** 359 * Finalize the egress message; depending on the protocol used 360 * by the Transport, this may involve sending an explicit "end 361 * of message" indicator. This method does not actually write the 362 * message out on the wire immediately. All writes happen at the end 363 * of the event loop at the earliest. 364 * 365 * If the ingress message also is complete, the transaction may 366 * detach itself from the Handler and Transport and delete itself 367 * as part of this method. 368 * 369 * Note: Either this method or sendAbort() should be called once 370 * per message. 371 */ 372 void sendEOM(){ 373 if(transport) 374 transport.sendEOM(this); 375 } 376 377 void sendTimeOut() 378 { 379 if(!transport) return; 380 import collie.codec.http.headers; 381 scope HTTPMessage msg = new HTTPMessage(); 382 msg.statusCode(408); 383 msg.statusMessage("Request Timeout"); 384 msg.getHeaders.add(HTTPHeaderCode.CONNECTION,"close"); 385 sendHeadersWithEOM(msg); 386 } 387 388 void sendWsData(OpCode code,ubyte[] data) 389 { 390 if(transport) 391 transport.sendWsData(this,code,data); 392 } 393 394 void onWsFrame(ref WSFrame wsf){ 395 logDebug("....."); 396 if(_handler) 397 _handler.onWsFrame(wsf); 398 } 399 400 bool onUpgtade(CodecProtocol protocol, HTTPMessage msg){ 401 if(_handler) 402 return _handler.onUpgtade(protocol, msg); 403 404 return false; 405 } 406 407 package: 408 void onDelayedDestroy() 409 { 410 // logDebug("deleting is ", deleting); 411 if(deleting) return; 412 deleting = true; 413 if(_handler) { 414 _handler.detachTransaction(); 415 _handler = null; 416 } 417 if(_transport) { 418 _transport.detach(this); 419 _transport = null; 420 } 421 } 422 private: 423 HTTPCodec.StreamID _id; 424 Transport _transport; 425 HTTPTransactionHandler _handler; 426 TransportDirection _direction; 427 uint _seqNo; 428 429 bool deleting = false; 430 private: 431 ushort _lastResponseStatus; 432 } 433